package com.qianjing.common.utils.kafka;

import com.qianjing.common.constant.KafkaConstants;
import com.qianjing.common.utils.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
//    @Value("${spring.kafka.security-protocol}")
//    private String kafkaSecurityProtocol;
//    @Value("${spring.kafka.sasl-mechanism}")
//    private String kafkaSASLMechanism;
//
//    @Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
//    private String kafkaConsumerSASLJaasConfig;
//
//    @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
//    private String kafkaProducerSASLJaasConfig;
//
//
//    @Value("${spring.kafka.consumer.enable-auto-commit}")
//    private Boolean autoCommit;
//
//    @Value("${spring.kafka.consumer.auto-commit-interval}")
//    private Integer autoCommitInterval;
//
//    @Value("${spring.kafka.consumer.group-id}")
//    private String groupId;
//
//    @Value("${spring.kafka.consumer.max-poll-records}")
//    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    //同一组中的consumer不会读取到同一个消息,不会重复消费
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.producer.retries}")
    private Integer retries;

    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;

    @Value("${spring.kafka.producer.retries}")
    private String acks;

    /*******************************************生产者的配置*******************************/
    @Bean
    @ConfigurationProperties(prefix = "spring.kafka")
    public KafkaServers kafkaServers(){
        return new KafkaServers();
    }

    @Bean
    public KafkaAdminClient kafkaAdminClient(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaServers().getBootStrapServers()));
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//        if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism) && !StringUtils.isEmpty(kafkaProducerSASLJaasConfig)) {
//            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
//            props.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
//            props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaProducerSASLJaasConfig);
//        }

        return (KafkaAdminClient) KafkaAdminClient.create(props);
    }

    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        // 消费者组中线程数量(消费者并发启动个数，最好跟kafka分区数量一致，不能超过分区数量)
        factory.setConcurrency(5);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(1000);
        //不自动启动
        factory.setAutoStartup(true);
        factory.setBatchListener(true);//设置为批量消费，每个批次数量在Kafka配置参数中设置
        //设置提交偏移量的方式， MANUAL_IMMEDIATE 表示消费一条提交一次；MANUAL表示批量提交一次
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }


    /*******************************************消费者的配置*******************************/
    /**
     * <p>
     *     注释一：有多个消费组监听，直接复制改方法
     *     注释二：修改 @Bean(name = "kafkaListenerContainerFactory") name重新命名
     *     注释三：factory.setConsumerFactory(consumerFactory("etctu_tradeinfolist_test")); 改下消费组 名称
     *     注释四：在调用的时候containerFactory="kafkaListenerContainerFactory" 改一下就ok
     * </p>
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> oneFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        // 消费者组中线程数量(消费者并发启动个数，最好跟kafka分区数量一致，不能超过分区数量)
        factory.setConcurrency(5);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(1000);
        //不自动启动
        factory.setAutoStartup(true);
        factory.setBatchListener(false);//设置为批量消费，每个批次数量在Kafka配置参数中设置
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);//设置手动提交ackMode
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // Kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaServers().getBootStrapServers()));
        //配置默认分组，这里没有配置+在监听的地方没有设置groupId，多个服务会出现收到相同消息情况
        props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.TOPIC_GROUP2);
        // 是否自动提交offset偏移量(默认true)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //设置每次拉取最大数量(默认500 此处要保证消息的大小没有超出限制，否则一样拉不了这么多)
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1000);
        // 自动提交的频率(ms)
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超时设置,服务端没有收到心跳就会认为当前消费者失效
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        // 键的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量规则设置：
        // (1)、earliest：当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，从头开始消费
        // (2)、latest：当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，消费新产生的该分区下的数据
        // (3)、none：topic各分区都存在已提交的offset时，从offset后开始消费；只要有一个分区不存在已提交的offset，则抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }
}
